/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.msu.client.discovery;

import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceList;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.iec.edp.caf.common.JSONSerializer;
import io.iec.edp.caf.commons.runtime.CafEnvironment;
import io.iec.edp.caf.commons.utils.StringUtils;
import io.iec.edp.caf.msu.api.client.ServiceDiscovery;
import io.iec.edp.caf.msu.api.entity.MsuProperties;
import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo;
import io.iec.edp.caf.msu.client.exception.ServiceUnitNotFoundException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;

import java.util.*;

/**
 * 服务中心-服务发现（k8s版）
 *
 * @author Leon Huo
 * @date 2023-04-26 14:40
 */
@Slf4j
public class KubernetesDiscoveryImpl implements ServiceDiscovery {

    private boolean enableSSL = false;

    private MsuProperties configuration;

    private KubernetesClient client;

    public KubernetesDiscoveryImpl(MsuProperties configuration) {
        this.configuration = configuration;
        this.enableSSL = enableSSL();
    }

    private String namespace = "";

    @Override
    public String discover(String su, HashMap<String, String> eventContext) {
        //k8s namespace must toLowerCase
        this.namespace = this.configuration.getApplicationName().toLowerCase();

        log.info("ServiceCenter(K8s) Start to discover su of namespace [{}] and su [{}]", this.namespace, su.toLowerCase());

        //discovery service
        ServiceList list = getK8sClient().services()
                .inNamespace(this.namespace)
                .withLabelSelector(new LabelSelector(null, Collections.singletonMap("msu", su.toLowerCase())))
                .list();

        if (list != null && list.getItems().size() > 0) {
            //service
            Service service = list.getItems().get(0);
            if (service != null) {
                String name = service.getMetadata().getName();

                //query endpoints of service
                Endpoints endpoints = this.client.endpoints().inNamespace(this.namespace).withName(name).get();
                if (endpoints != null && endpoints.getSubsets() != null && endpoints.getSubsets().size() > 0) {
                    //port
                    int port = CafEnvironment.getPort();
                    if (service.getSpec().getPorts().get(0) != null) {
                        port = service.getSpec().getPorts().get(0).getPort();
                    }

                    String address = this.enableSSL ? "https://" : "http://" + name + ":" + port;
                    log.info("ServiceCenter(K8s) Finish to discover su [{}], URL is [{}]", su.toLowerCase(), address);

                    return address;
                }
            }
        }

        //cannot find service, throw ServiceUnitNotFoundException
        throw new ServiceUnitNotFoundException("ServiceCenter(K8s) Cannot find su [" + su.toLowerCase() + "]");
    }

    @Override
    public List<String> discoverAll(String su, HashMap<String, String> eventContext) {
        List<String> urls = new ArrayList<>();
        urls.add(discover(su, eventContext));

        return urls;
    }

    public List<ServiceUnitInfo> getEnabledServiceUnitInfo() {
        //k8s namespace must toLowerCase
        this.namespace = this.configuration.getApplicationName().toLowerCase();

        List<ServiceUnitInfo> infos = null;

        //discovery service
        ServiceList list = getK8sClient().services()
                .inNamespace(this.namespace)
                .withLabelSelector(new LabelSelector(null, Collections.singletonMap("namespace", this.namespace)))
                .list();
        if (list != null && list.getItems() != null && list.getItems().size() > 0) {
            infos = new ArrayList<>();
            //forEach services
            for (Service service : list.getItems()) {
                String name = service.getMetadata().getName();

                //query endpoints of service to ensure svc is useful
                Endpoints endpoints = this.client.endpoints().inNamespace(this.namespace).withName(name).get();
                if (endpoints != null && endpoints.getSubsets() != null && endpoints.getSubsets().size() > 0) {
                    String description = service.getMetadata().getAnnotations().get("description");
                    ServiceUnitInfo info = JSONSerializer.deserialize(description, ServiceUnitInfo.class);
                    infos.add(info);
                }
            }
        }

        return infos;
    }

    /**
     * 是否开启https
     */
    private boolean enableSSL() {
        Environment environment = CafEnvironment.getEnvironment();
        return !StringUtils.isEmpty(environment.getProperty("server.ssl.key-store"));
    }

    /**
     * initialize k8s client
     */
    private KubernetesClient getK8sClient() {
        if (this.client == null) {
            //initialize k8s client
            log.info("init k8s client");
            this.client = new DefaultKubernetesClient();
            log.info("finish to init k8s client");
        }

        return this.client;
    }

}
